package com.atguigu.day05;

import com.atguigu.bean.WaterSensor;
import com.atguigu.func.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * @author Felix
 * @date 2024/7/13
 * 该案例演示了滚动处理时间窗口对象的创建以及窗口处理函数reduce
 * 需求：从指定的网络端口读取水位数据，统计最近10s相同传感器采集的水位和
 */
public class Flink05_Window_Tumb_Process_Reduce {
    public static void main(String[] args) throws Exception {
        //TODO 1.指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2.从指定的网络端口读取数据
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 8888);
        //TODO 3.对读取的数据进行类型转换   String->Watersensor
        SingleOutputStreamOperator<WaterSensor> wsDS = socketDS.map(new WaterSensorMapFunction());
        //TODO 4.按照传感器id进行分组
        KeyedStream<WaterSensor, String> keyedDS = wsDS.keyBy(WaterSensor::getId);
        //TODO 5.开窗 --- 滚动处理时间窗口
        WindowedStream<WaterSensor, String, TimeWindow> windowDS = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        //TODO 6.对窗口中数据进行聚合计算
        SingleOutputStreamOperator<WaterSensor> reduceDS = windowDS.reduce(
                new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("中间累加的结果：" + value1);
                        System.out.println("新来的数据：" + value2);
                        value1.setVc(value1.getVc() + value2.getVc());
                        return value1;
                    }
                }
        );
        //TODO 7.输出
        reduceDS.print("~~~");
        //TODO 8.提交作业
        env.execute();
    }
}
